package cn.uncode.rpc.core.protocol.rpc;

import java.util.HashMap;
import java.util.Map;

import cn.uncode.rpc.common.log.Logger;
import cn.uncode.rpc.common.log.LoggerFactory;
import cn.uncode.rpc.core.AbstractExporter;
import cn.uncode.rpc.core.AbstractInvoker;
import cn.uncode.rpc.core.Exporter;
import cn.uncode.rpc.core.Future;
import cn.uncode.rpc.core.FutureListener;
import cn.uncode.rpc.core.Invoker;
import cn.uncode.rpc.core.Request;
import cn.uncode.rpc.core.Response;
import cn.uncode.rpc.core.URL;
import cn.uncode.rpc.core.URLParam;
import cn.uncode.rpc.core.protocol.AbstractProtocol;
import cn.uncode.rpc.exception.FrameworkException;
import cn.uncode.rpc.exception.TransportException;
import cn.uncode.rpc.spi.ExtensionLoader;
import cn.uncode.rpc.spi.SpiMeta;
import cn.uncode.rpc.transport.Client;
import cn.uncode.rpc.transport.ChannelFactory;
import cn.uncode.rpc.transport.Server;
import cn.uncode.rpc.transport.support.ProtectedRequestRoutingHandler;
import cn.uncode.rpc.transport.support.RequestRoutingHandler;
import cn.uncode.rpc.util.FrameworkUtil;



/**
 * @author maijunsheng
 * @version 创建时间：2013-5-22
 */
@SpiMeta(name = "uncode")
public class DefaultRpcProtocol extends AbstractProtocol {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRpcProtocol.class);

    // 多个service可能在相同端口进行服务暴露，因此来自同个端口的请求需要进行路由以找到相应的服务，同时不在该端口暴露的服务不应该被找到
    private Map<String, RequestRoutingHandler> ipPort2RequestRouter = new HashMap<String, RequestRoutingHandler>();

    @Override
    protected <T> Exporter<T> createExporter(Invoker<T> provider, URL url) {
        return new DefaultRpcExporter<T>(provider, url);
    }

    @Override
    protected <T> Invoker<T> createInvoker(Class<T> clz, URL url, URL serviceUrl) {
        return new DefaultRpcInvoker<T>(clz, url, serviceUrl);
    }

    /**
     * rpc provider
     *
     * @param <T>
     * @author maijunsheng
     */
    class DefaultRpcExporter<T> extends AbstractExporter<T> {
        private Server server;
        private ChannelFactory endpointFactory;

        public DefaultRpcExporter(Invoker<T> provider, URL url) {
            super(provider, url);

            RequestRoutingHandler requestRouter = initRequestRoutingHandler(url);
            endpointFactory = ExtensionLoader.getExtensionLoader(ChannelFactory.class).getExtension(
            		url.getParameter(URLParam.ENDPOINT_FACTORY.getName(), URLParam.ENDPOINT_FACTORY.getValue()));
            server = endpointFactory.createServer(url, requestRouter);
        }

        @SuppressWarnings("unchecked")
        @Override
        public void unexport() {
            String protocolKey = FrameworkUtil.getProtocolKey(url);
            String ipPort = url.getServerPortStr();

            Exporter<T> exporter = (Exporter<T>) exporterMap.remove(protocolKey);

            if (exporter != null) {
                exporter.destroy();
            }

            synchronized (ipPort2RequestRouter) {
            	RequestRoutingHandler requestRouter = ipPort2RequestRouter.get(ipPort);
                if (requestRouter != null) {
                    requestRouter.removeProvider(invoker);
                }
            }

            LOGGER.info("DefaultRpcExporter unexport Success: url={}", url);
        }

        @Override
        protected boolean doInit() {
            boolean result = server.start();
            return result;
        }

        @Override
        public boolean isAvailable() {
            return server.isAvailable();
        }

        @Override
        public void destroy() {
            endpointFactory.safeReleaseResource(server, url);
            LOGGER.info("DefaultRpcExporter destory Success: url={}", url);
        }

        private RequestRoutingHandler initRequestRoutingHandler(URL url) {
        	RequestRoutingHandler requestRouter = null;
            String ipPort = url.getServerPortStr();

            synchronized (ipPort2RequestRouter) {
                requestRouter = ipPort2RequestRouter.get(ipPort);

                if (requestRouter == null) {
                    requestRouter = new ProtectedRequestRoutingHandler(invoker);
                    ipPort2RequestRouter.put(ipPort, requestRouter);
                } else {
                    requestRouter.addProvider(invoker);
                }
            }

            return requestRouter;
        }
    }

    /**
     * rpc referer
     *
     * @param <T>
     * @author maijunsheng
     */
    class DefaultRpcInvoker<T> extends AbstractInvoker<T> {
        private Client client;
        private ChannelFactory endpointFactory;

        public DefaultRpcInvoker(Class<T> clz, URL url, URL serviceUrl) {
            super(url, clz);

            endpointFactory =
                    ExtensionLoader.getExtensionLoader(ChannelFactory.class).getExtension(
                            url.getParameter(URLParam.ENDPOINT_FACTORY.getName(), URLParam.ENDPOINT_FACTORY.getValue()));
            client = endpointFactory.createClient(url);
        }

        @Override
        public Response doInvoke(Request request) {
        	 try {
                 // 为了能够实现跨group请求，需要使用server端的group。
                 request.setAttachment(URLParam.GROUP.getName(), ":todo");
                 return client.send(request);
             } catch (TransportException exception) {
                 throw new FrameworkException("DefaultRpcReferer call Error: url=" + url.getUri(), exception);
             }
		}

        @Override
        protected void decrActiveCount(Request request, Response response) {
            if (response == null || !(response instanceof Future)) {
            	activeInvokeCount.decrementAndGet();
                return;
            }

			Future future = (Future) response;

            future.addListener(new FutureListener() {
                @Override
                public void operationComplete(Future future) throws Exception {
                	activeInvokeCount.decrementAndGet();
                }
            });
        }

        @Override
        protected boolean doInit() {
            boolean result = client.connect();

            return result;
        }

        @Override
        public boolean isAvailable() {
            return client.isAvailable();
        }

        @Override
        public void destroy() {
            endpointFactory.safeReleaseResource(client, url);
            LOGGER.info("DefaultRpcReferer destory client: url={}" + url);
        }

		
    }
}
